#!/usr/bin/python -O

import sys, os
import time

from threading import Thread
import qpid.messaging.exceptions

from amqp import RoutingKey, Consumer
from protomsg import GpbMessage

def dispatch_msg(msg, output=sys.stdout):
    if msg.subject:
        sKey = msg.subject
    else:
        sKey = msg.properties.get('x-amqp-0-10.routing-key')

    print 'Received event with routing key:', sKey

    if msg.content_type != 'application/x-protobuf':
        print 'Not a google protobuf message'
        return

    if output != sys.stdout:
        output.write('/@ routing key\n%s\n@/\n' % sKey)

    rKey = RoutingKey(sKey)

    pMsg = GpbMessage(opts.protoSrc,
                      rKey.GetPackage(),
                      rKey.GetPublicationName())

    from google.protobuf.message import DecodeError
    try:
        pMsg.loads(msg.content)

        aMsg = GpbMessage(opts.protoSrc,
                          rKey.GetPackage(),
                          rKey.GetPublicationName())
        outTpa = pMsg.dumptabs()
        aMsg.loadtabs(outTpa)

        if outTpa != aMsg.dumptabs():
            raise AssertionError("Error for tpa:%s", rKey)

        bMsg = GpbMessage(opts.protoSrc,
                          rKey.GetPackage(),
                          rKey.GetPublicationName())
        outJson = pMsg.dumpjson()
        bMsg.loadjson(outJson)

        if outJson != bMsg.dumpjson():
            raise AssertionError("Error for json:%s", rKey)

        if __debug__ or output != sys.stdout:
            output.write('/@ Tpa string\n%s\n@/\n/=\n%s=/\n\n' % (outTpa, pMsg))
            output.flush()

    except DecodeError:
        print 'Decode error for key: %s' % sKey
        output.write('Decode error for key: %s\n' % sKey)
    except Exception, e:
        print e


class Getter(Thread):
    def __init__(self, broker, exchange, bindingKeys, output=sys.stdout):
        super(Getter, self).__init__()

        self.__broker = broker
        self.__exchange = exchange
        self.__bindingKeys = bindingKeys
        self.__output = output

        self.__exit = False
        self.__receiver = None


    def run(self):
        if __debug__: print self.name, 'Start'

        while not self.__exit:
            try:
                if not self.__receiver:
                    self.__receiver = Consumer(self.__broker,
                                           self.__exchange,
                                           self.__bindingKeys)

                msg = self.__receiver.fetch(15)

                dispatch_msg(msg, self.__output)

            except qpid.messaging.exceptions.ConnectError:
                print '%s Could not connect to  %s, try again...' % (
                        self.name, self.__broker)
                time.sleep(5)

            except qpid.messaging.exceptions.ConnectionError:
                print 'Lost connection to %s, try again...' % self.__broker
                self.__receiver = None
                time.sleep(5)

            except qpid.messaging.exceptions.Empty:
                if __debug__:
                    print 'No message, waiting...'

        if __debug__: print self.name, 'Finished'


    def join(self, timeout=None):
        self.__exit = True
        if __debug__: print 'Stoping', self.name
        super(Getter, self).join(timeout)



# -- main start here --
from optparse import OptionParser
parser = OptionParser()
parser.add_option('-b', '--broker', dest = 'brokers', action='append',
      help='one or more address of the broker to subscribe')
parser.add_option('--proto-src', dest ='protoSrc', action='store',
      default = os.path.join(os.environ['HOME'], 'publications'),
      help='directory to store subscribe, default [%default]')
parser.add_option('-o', '--output', dest ='out',
      action='store', default = sys.stdout,
      help='file to dump message text, default [stdout]')
parser.add_option('-k', '--binding-key', dest = 'bindingKeys',
      action='append',
      help='routing keys to subscribe')

(opts, args) = parser.parse_args()

if opts.bindingKeys is None: opts.bindingKeys = ['#']

if __debug__:
    print opts.brokers
    print opts.bindingKeys

exchange = 'amq.topic'

if opts.out == sys.stdout:
    output = sys.stdout
else:
    output = open(opts.out, 'w')

threads = [Getter(broker, exchange, opts.bindingKeys, output)
              for broker in opts.brokers]


try:
    for thread in threads: thread.start()

    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print '\nControl-C pressed, exiting...'

except qpid.messaging.exceptions.ConnectionError:
    print 'Lost connection, exit...'

except qpid.messaging.exceptions.MalformedAddress:
    print 'Malformed routingkey'

#finally:
for thread in threads:
    print 'Terminate', thread.name
    thread.join()

if output != sys.stdout: output.close()


